/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package software.amazon.awssdk.enhanced.dynamodb.functionaltests;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
import software.amazon.awssdk.enhanced.dynamodb.extensions.AutoGeneratedTimestampRecordExtension;
import software.amazon.awssdk.enhanced.dynamodb.functionaltests.models.FlattenRecord;
import software.amazon.awssdk.enhanced.dynamodb.internal.client.ExtensionResolver;
import software.amazon.awssdk.enhanced.dynamodb.mapper.testbeans.flattenmap.FlattenMapAndFlattenRecordBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.testbeans.flattenmap.FlattenMapInvalidBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.testbeans.flattenmap.FlattenMapValidBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.testbeans.flattenmap.NestedFlattenMapBean;

public class FlattenMapTest extends LocalDynamoDbSyncTestBase {

    private static final TableSchema<FlattenMapValidBean> TABLE_SCHEMA =
        TableSchema.fromClass(FlattenMapValidBean.class);

    private final DynamoDbEnhancedClient enhancedClient =
        DynamoDbEnhancedClient.builder()
                              .dynamoDbClient(getDynamoDbClient())
                              .extensions(Stream.concat(
                                                    ExtensionResolver.defaultExtensions().stream(),
                                                    Stream.of(AutoGeneratedTimestampRecordExtension.create()))
                                                .collect(Collectors.toList()))
                              .build();

    private final DynamoDbTable<FlattenMapValidBean> mappedTable =
        enhancedClient.table(getConcreteTableName("table-name"), TABLE_SCHEMA);

    @Rule
    public ExpectedException exception = ExpectedException.none();

    @Before
    public void createTable() {
        mappedTable.createTable(r -> r.provisionedThroughput(getDefaultProvisionedThroughput()));
    }

    @After
    public void deleteTable() {
        getDynamoDbClient().deleteTable(r -> r.tableName(getConcreteTableName("table-name")));
    }

    @Test
    public void updateItemWithFlattenMap_correctlyFlattensMapAttributes() {

        //first update
        FlattenMapValidBean record = new FlattenMapValidBean();
        record.setId("1");
        record.setRootAttribute1("rootValue1");
        record.setRootAttribute2("rootValue2");
        record.setAttributesMap(new HashMap<String, String>() {{
            put("mapAttribute1", "mapValue1");
            put("mapAttribute2", "mapValue2");
            put("mapAttribute3", "mapValue3");
        }});

        mappedTable.updateItem(record);

        FlattenMapValidBean persistedRecord = mappedTable.getItem(record);
        assertThat(persistedRecord.getId()).isEqualTo("1");
        assertThat(persistedRecord.getRootAttribute1()).isEqualTo("rootValue1");
        assertThat(persistedRecord.getRootAttribute2()).isEqualTo("rootValue2");
        assertThat(persistedRecord.getAttributesMap()).hasSize(3);
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute1", "mapValue1");
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute2", "mapValue2");
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute3", "mapValue3");


        //second update
        record = new FlattenMapValidBean();
        record.setId("2");
        record.setRootAttribute2("rootValue2_new");
        record.setAttributesMap(new HashMap<String, String>() {{
            put("mapAttribute1", "mapValue1_new");
            put("mapAttribute2", "mapValue2_new");
            put("mapAttribute3", "mapValue3");
        }});

        mappedTable.updateItem(record);

        persistedRecord = mappedTable.getItem(record);
        assertThat(persistedRecord.getId()).isEqualTo("2");
        assertThat(persistedRecord.getRootAttribute1()).isNull();
        assertThat(persistedRecord.getRootAttribute2()).isEqualTo("rootValue2_new");
        assertThat(persistedRecord.getAttributesMap()).hasSize(3);
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute1", "mapValue1_new");
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute2", "mapValue2_new");
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute3", "mapValue3");


        //third update
        record = new FlattenMapValidBean();
        record.setId("3");
        record.setRootAttribute1("rootValue1_new");
        record.setRootAttribute2("rootValue2_new");
        record.setAttributesMap(new HashMap<String, String>() {{
            put("mapAttribute1", "mapValue1_new");
            put("mapAttribute2", "mapValue2_new");
            put("mapAttribute3", "mapValue3_new");
        }});

        mappedTable.updateItem(record);

        persistedRecord = mappedTable.getItem(record);
        assertThat(persistedRecord.getId()).isEqualTo("3");
        assertThat(persistedRecord.getRootAttribute1()).isEqualTo("rootValue1_new");
        assertThat(persistedRecord.getRootAttribute2()).isEqualTo("rootValue2_new");
        assertThat(persistedRecord.getAttributesMap()).hasSize(3);
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute1", "mapValue1_new");
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute2", "mapValue2_new");
        assertThat(persistedRecord.getAttributesMap()).containsEntry("mapAttribute3", "mapValue3_new");


        //fourth update
        record = new FlattenMapValidBean();
        record.setId("4");
        record.setAttributesMap(new HashMap<>());

        mappedTable.updateItem(record);

        persistedRecord = mappedTable.getItem(record);
        assertThat(persistedRecord.getId()).isEqualTo("4");
        assertThat(persistedRecord.getRootAttribute1()).isNull();
        assertThat(persistedRecord.getRootAttribute2()).isNull();
        assertThat(persistedRecord.getAttributesMap()).isNull();
    }

    @Test
    public void updateItemWithFlattenMap_withDuplicateAttributeName_throwsIllegalArgumentException() {
        FlattenMapValidBean record = new FlattenMapValidBean();
        record.setId("1");
        record.setRootAttribute1("rootValue1");
        record.setRootAttribute2("rootValue2");
        record.setAttributesMap(new HashMap<String, String>() {{
            put("mapAttribute1", "mapValue1");
            put("mapAttribute2", "mapValue2");
            put("mapAttribute3", "mapValue3");
            put("id", "newIdValue");
        }});

        assertThatThrownBy(() -> mappedTable.updateItem(record))
            .isInstanceOf(IllegalArgumentException.class)
            .hasMessageContaining("Attempt to add an attribute to a mapper that already has one with the same name. ")
            .hasMessageContaining("[Attribute name: id]");
    }

    @Test
    public void updateItemWithFlattenMap_duplicateAttributeInFlattenedRecord_throwsIllegalArgumentException() {
        String composedTableName = getConcreteTableName("table-name-composed");

        TableSchema<FlattenMapAndFlattenRecordBean> composedSchema = TableSchema.fromClass(FlattenMapAndFlattenRecordBean.class);
        DynamoDbTable<FlattenMapAndFlattenRecordBean> composedTable = enhancedClient.table(composedTableName, composedSchema);
        composedTable.createTable(r -> r.provisionedThroughput(getDefaultProvisionedThroughput()));

        try {
            FlattenRecord flattenRecord = new FlattenRecord();
            flattenRecord.setId("1");

            FlattenMapAndFlattenRecordBean flattenMapAndFlattenRecord = new FlattenMapAndFlattenRecordBean();
            flattenMapAndFlattenRecord.setFlattenRecord(flattenRecord);
            flattenMapAndFlattenRecord.setAttributesMap(Collections.singletonMap("id", "id2"));

            assertThatThrownBy(() -> composedTable.updateItem(flattenMapAndFlattenRecord))
                .isInstanceOf(IllegalArgumentException.class)
                .hasMessageContaining("Attempt to add an attribute to a mapper that already has one with the same name.");
        } finally {
            getDynamoDbClient().deleteTable(r -> r.tableName(composedTableName));
        }
    }

    @Test
    public void updateItemWithFlattenMap_withMultipleAnnotatedMaps_throwsIllegalArgumentException() {
        assertThatThrownBy(() -> TableSchema.fromClass(FlattenMapInvalidBean.class))
            .isInstanceOf(IllegalArgumentException.class)
            .hasMessageContaining("Multiple @DynamoDbFlatten Map<String, String> properties found. "
                                  + "Only one flattened map per class is supported.");
    }

    @Test
    public void updateItemWithNestedFlattenMap_correctlyFlattensMapAttributes() {
        String nestedTableName = getConcreteTableName("nested-table-name");

        TableSchema<NestedFlattenMapBean> nestedSchema = TableSchema.fromClass(NestedFlattenMapBean.class);
        DynamoDbTable<NestedFlattenMapBean> nestedTable = enhancedClient.table(nestedTableName, nestedSchema);
        nestedTable.createTable(r -> r.provisionedThroughput(getDefaultProvisionedThroughput()));

        try {
            NestedFlattenMapBean record = new NestedFlattenMapBean();
            record.setId("1");
            record.setRootAttribute("rootValue");

            NestedFlattenMapBean.NestedClassWithMap nestedClass = new NestedFlattenMapBean.NestedClassWithMap();
            nestedClass.setNestedAttribute("nestedValue");
            nestedClass.setNestedMap(new HashMap<String, String>() {{
                put("nestedMapKey1", "nestedMapValue1");
                put("nestedMapKey2", "nestedMapValue2");
            }});
            record.setNestedClass(nestedClass);

            nestedTable.updateItem(record);

            NestedFlattenMapBean persistedRecord = nestedTable.getItem(record);
            assertThat(persistedRecord.getId()).isEqualTo("1");
            assertThat(persistedRecord.getRootAttribute()).isEqualTo("rootValue");
            assertThat(persistedRecord.getNestedClass().getNestedAttribute()).isEqualTo("nestedValue");
            assertThat(persistedRecord.getNestedClass().getNestedMap()).hasSize(2);
            assertThat(persistedRecord.getNestedClass().getNestedMap()).containsEntry("nestedMapKey1", "nestedMapValue1");
            assertThat(persistedRecord.getNestedClass().getNestedMap()).containsEntry("nestedMapKey2", "nestedMapValue2");
        } finally {
            getDynamoDbClient().deleteTable(r -> r.tableName(nestedTableName));
        }
    }
}